

use std::sync::{Arc};
use std::collections::HashMap;
use crate::source::{source_add_next, source_add_next_online};
use tokio::runtime::Runtime;
use crate::util::{AsyncSendData, async_data_channel_new, AsyncRecvData, SyncSender, SyncRecv, channel_new};
use crate::conf_init::{MapResult, MapErr, work_space_conf};
use crate::indicators::{indicators_key_ref, Op, IndicatorsMode, work_space_input_register};
use crate::logical_plan::work_node::conf::WorkNodeConf;
use crate::logical_plan::work_node::work_node_register_online;
use crate::util::dyn_runtime::dyn_new_runtime;
use tokio::sync::RwLock as TokioRwLock;
use tokio::sync::RwLockReadGuard as TokioRwLockReadGuard;
use tokio::sync::RwLockWriteGuard as TokioRwLockWriteGuard;
use once_cell::sync::OnceCell;
use crate::data_frame::DataInstance;

#[derive(Deserialize,Clone,Debug)]
pub struct WorkSpaceDebug{
    /*
    事件在work_node的流转路径
    */
    pub event_flow_path: Option<bool>,
    /*
    事件被work_node处理或者判断时的运行时错误
    */
    pub run_err: Option<bool>,
    /*
    可观测性指标：
        三个有关事件处理相关的时间参数：
            1）事件进入work_node入口队列的时间
            2）事件开始被处理的时间
            3）事件在该节点处理完，准备发往下一个节点的时间
        work_node节点入事件统计；work_node节点出事件统计；
        work_node节点cpu时钟周期统计
    */
    pub observability_indicator: Option<bool>,

}

#[derive(Deserialize,Debug)]
pub struct WorkSpaceConf{
    pub name: String,
    pub enable: bool,
    input: Vec<String>,
    threads: usize,
    debug: Option<WorkSpaceDebug>,
}

pub struct WorkSpace{
    name: String,
    runtime: Arc<Runtime>,
    ctl_recv: SyncRecv<WorkSpaceMsg>,
    input: HashMap<String, AsyncRecvData>,
    next: HashMap<String, AsyncSendData>,
}
pub struct WorkSpaceCtl{
    pub name: String,
    pub enable: bool,
    runtime: Option<Arc<Runtime>>,
    ctl_send: SyncSender<WorkSpaceMsg>,
    work_space: Option<WorkSpace>,
    debug: Arc<Option<WorkSpaceDebug>>,
}

pub enum WorkSpaceMsg{

}

static WORK_SPACE_LIST: OnceCell<TokioRwLock<HashMap<String, WorkSpaceCtl>>> = OnceCell::new();

pub async fn work_space_list_read<'a>()->TokioRwLockReadGuard<'a, HashMap<String, WorkSpaceCtl>>{
    let tmp = WORK_SPACE_LIST.get();

    let tmp = if let Some(tmp) = tmp{
        tmp
    }else{
        let value = match work_space_list_init_from_conf().await{
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("sink_init_from_conf err: {:?}", e);
                std::process::exit(-1);
            },
        };
        WORK_SPACE_LIST.get_or_init(||{value})
    };

    tmp.read().await
}

pub async fn work_space_list_write<'a>()->TokioRwLockWriteGuard<'a, HashMap<String, WorkSpaceCtl>>{
    let tmp = WORK_SPACE_LIST.get();

    let tmp = if let Some(tmp) = tmp{
        tmp
    }else{
        let value = match work_space_list_init_from_conf().await{
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("sink_init_from_conf err: {:?}", e);
                std::process::exit(-1);
            },
        };
        WORK_SPACE_LIST.get_or_init(||{value})
    };

    tmp.write().await
}

/*
    用于在线注册一个新的work_space及其之后的内部work_node
*/
pub async fn work_space_register_online(work_space_conf: &WorkSpaceConf,
    work_node_list: &[WorkNodeConf])->MapResult<()>{

    info!("work_space_register_online ");

    {
        let work_space_ctl_list = work_space_list_read().await;
        if let Some(_) = work_space_ctl_list.get(work_space_conf.name.as_str()){
            return MapResult::Err(MapErr::new(format!("work_space [{}] already exists", work_space_conf.name)));
        } 
    }
    
    let work_space_ctl = work_space_init_from_conf(work_space_conf, true).await?;

    {
        let mut work_space_ctl_list = work_space_list_write().await;
        work_space_ctl_list.insert(work_space_conf.name.clone(), work_space_ctl.unwrap());
    }

    if let MapResult::Err(e) = work_node_register_online(work_space_conf.name.as_str(), work_node_list).await{
        let mut work_space_ctl_list = work_space_list_write().await;
        let _tmp = work_space_ctl_list.remove(&work_space_conf.name);
        return MapResult::Err(e);
    }

    let mut work_space_ctl_list = work_space_list_write().await;
    let mut work_space_ctl = work_space_ctl_list.get_mut(&work_space_conf.name).unwrap();
    work_space_run_one(&mut work_space_ctl)?;
    return MapResult::Ok(());
}

fn work_space_runtime_build(work_space_name: &str, threads: usize)->MapResult<Runtime>{

    match tokio::runtime::Builder::new_multi_thread()
        .thread_name(format!("work_space_{}", work_space_name))
        .worker_threads(threads)
        .build(){
        Result::Ok(t) => {
            return MapResult::Ok(t);
        },
        Result::Err(e) => {
            let err_desc = format!("work_space [{}] runtime build failed [{:?}]",
                                   work_space_name, e);
            return MapResult::Err(MapErr::new(err_desc));
        },
    }
}

async fn work_space_list_init_from_conf()
    ->MapResult<TokioRwLock<HashMap<String, WorkSpaceCtl>>>{

    info!("work_space_list_init_from_conf");

    let conf = work_space_conf();

    let mut work_space_ctl_list = HashMap::new();

    if let Some(conf) = conf{
        for work_space_conf in conf.iter() {

            let work_space_ctl = work_space_init_from_conf(work_space_conf, false).await?;
            if let Some(work_space_ctl) = work_space_ctl{
                work_space_ctl_list.insert(work_space_conf.name.to_string(), work_space_ctl);
            }
        }
    }

    return MapResult::Ok(TokioRwLock::new(work_space_ctl_list));
}

async fn work_space_init_from_conf(work_space_conf: &WorkSpaceConf, is_online: bool)
    ->MapResult<Option<WorkSpaceCtl>>{

    info!("work_space_init_from_conf [{}]", work_space_conf.name);

    let (ctl_send, ctl_recv) = channel_new();

    let (runtime,work_space) = if work_space_conf.enable{
        let mut work_space_input = HashMap::new();
        for input_name in work_space_conf.input.iter() {
            let input = if is_online{
                source_add_next_online(input_name, &work_space_conf.name).await?
            }else{
                source_add_next(input_name, &work_space_conf.name).await?
            };

            work_space_input.insert(input_name.to_string(), input);
        }

        let runtime = if is_online {
            dyn_new_runtime(&work_space_conf.name, work_space_conf.threads)
        }else{
            work_space_runtime_build(&work_space_conf.name, work_space_conf.threads)?
        };
        let runtime = Arc::new(runtime);

        let work_space = WorkSpace {
            name: work_space_conf.name.to_string(),
            runtime: Arc::clone(&runtime),
            ctl_recv: ctl_recv,
            input: work_space_input,
            next: HashMap::new(),
        };
        (Some(runtime), Some(work_space))
    }else{
        (None, None)
    };

    let work_space_ctl = WorkSpaceCtl {
        name: work_space_conf.name.to_string(),
        enable: work_space_conf.enable,
        runtime: runtime,
        ctl_send: ctl_send,
        work_space: work_space,
        debug: Arc::new(work_space_conf.debug.clone()),
    };

    info!("work_space_init_from_conf [{}] end", work_space_conf.name);
    return MapResult::Ok(Some(work_space_ctl));
}

pub async fn work_space_add_next(work_space_name: &str, next_name: &str)
    ->MapResult<(Option<AsyncRecvData>,String)>{

    let mut work_space_ctl_list = work_space_list_write().await;

    if let Some(work_space_ctl) = work_space_ctl_list.get_mut(work_space_name){
        if work_space_ctl.enable{
            if let Some(ref mut work_space) = work_space_ctl.work_space{
                let (s,r) = async_data_channel_new(format!("{}-{}", work_space_name, next_name));
                work_space.next.insert(next_name.to_string(), s);
                return MapResult::Ok((Some(r),work_space_name.to_string()));
            }else{
                let err_desc = format!("work_space [{}] add next, but work_space_ctl work_space is None",
                                       work_space_name);
                return MapResult::Err(MapErr::new(err_desc));
            }
        }else{
            return MapResult::Ok((None,work_space_name.to_string()));
        }
    }else{
        let err_desc = format!("work_space [{}] add next, but not exist", work_space_name);
        return MapResult::Err(MapErr::new(err_desc));
    }
}

pub async fn work_space_runtime_by_name(work_space_name: &str)->Option<Arc<Runtime>>{
    let work_space_list = work_space_list_read().await;
    if let Some(work_space) = work_space_list.get(work_space_name){
        if let Some(ref runtime) = work_space.runtime{
            return Some(Arc::clone(&runtime));
        }else{
            return None;
        }
    }else{
        return None;
    }
}

pub async fn work_space_ctl_get(work_space_name: &str)->MapResult<(Arc<Runtime>,Arc<Option<WorkSpaceDebug>>)>{
    let work_space_ctl_list = work_space_list_write().await;

    if let Some(work_space_ctl) = work_space_ctl_list.get(work_space_name){
        if work_space_ctl.enable{
            if let Some(ref runtime) = work_space_ctl.runtime{
                return MapResult::Ok((Arc::clone(runtime), Arc::clone(&work_space_ctl.debug)));
            }
        }
        return MapResult::Err(MapErr::new(format!("work_space [{}] not enable", work_space_name)));
    }else{
        let err_desc = format!("work_space [{}] runtime, but not exist", work_space_name);
        return MapResult::Err(MapErr::new(err_desc));
    }
}

pub async fn work_space_run()->MapResult<()>{
    info!("work_space_run");

    let mut work_space_ctl_list = work_space_list_write().await;
    for (_,work_space_ctl) in work_space_ctl_list.iter_mut(){
        work_space_run_one(work_space_ctl)?;
    }
    return MapResult::Ok(());
}

fn work_space_run_one(work_space_ctl: &mut WorkSpaceCtl)->MapResult<()>{

    info!("work_space_run_one [{}]", work_space_ctl.name);

    /*
        每一个wokr_space的每一个input，都启动一个feature，负责从对应input接收数据并发给所有next
    */

    if work_space_ctl.enable{

        if let Some(work_space) = work_space_ctl.work_space.take(){
            work_space_each(work_space);
        }else{
            let err_desc = format!("work_space [{}] run, but is None", work_space_ctl.name);
            return MapResult::Err(MapErr::new(err_desc));
        }
    }else{
        info!("work_space [{}] not enable", work_space_ctl.name);
    }

    return MapResult::Ok(());
}

async fn work_space_input_each(work_space_name: String, input: AsyncRecvData, next: HashMap<String, AsyncSendData>){

    let work_space_name = Arc::new(work_space_name);
    work_space_input_register(work_space_name.as_str(), input.clone()).await;

    loop{
        if let Some(mut data) = input.recv().await{
            indicators_key_ref(Op::Add, IndicatorsMode::WorkSpace, work_space_name.as_str(), "recv", 1);

            if let DataInstance::BatchData(ref mut batch_data) = data{
                for data in batch_data.iter_mut(){
                    data.metadata.work_space = Arc::clone(&work_space_name);
                }
            }

            for (next_name,n) in next.iter(){
                n.send(data.clone()).await;
            }
        }
    }
}
fn work_space_each(work_space: WorkSpace){

    for (_input_name,input) in work_space.input.iter(){
        let name = work_space.name.to_string();
        work_space.runtime.spawn(
            work_space_input_each(name, input.clone(), work_space.next.clone())
        );
    }
}
